-
Notifications
You must be signed in to change notification settings - Fork 1k
frollapply rewritten, by.column=F, parallel, any type #7272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #7272 +/- ##
==========================================
+ Coverage 99.01% 99.07% +0.05%
==========================================
Files 81 83 +2
Lines 15503 15662 +159
==========================================
+ Hits 15351 15517 +166
+ Misses 152 145 -7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Generated via commit ea19cf5 Download link for the artifact containing the test results: ↓ atime-results.zip
|
This comment was marked as outdated.
This comment was marked as outdated.
aitap
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, most concerning are the covr failures. Interesting that the builds started failing after adding Suggests: parallel, because that's how covr decides whether to apply fixes for parallel:::mcexit. Not yet sure what could be broken as a result.
| } | ||
| if (by.column) { | ||
| allocWindow = function(x, n) x[seq_len(n)] | ||
| tight = function(i, dest, src, n) FUN(.Call(CmemcpyVector, dest, src, i, n), ...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's how the sharing problem (setDTthreads(1); frollapply(c(1, 9), N=1L, FUN=identity)) can be avoided for the !adaptive && by.column case:
- Hide the newly-allocated window vector inside a list to control its reference count. The
wwill be referenced byfrollapply(+1) and bylapply(+1) for a total reference count of 2 (and we can't count higher than that), butw[[1]]must stay with a reference count of 1 (only referenced byw). This needs rawCcopy(i.e.duplicate()), not R-levelcopy(with special processing for lists) to work. - In
memcpyVector, check whetherREFCNT(w[[1]])is 1. If not, someone must have kept a reference to it, so repair the situation by allocating a new vector. Return the window itself, not the list-container.
diff --git a/R/frollapply.R b/R/frollapply.R
index d1c30842..527208a6 100644
--- a/R/frollapply.R
+++ b/R/frollapply.R
@@ -213,7 +213,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
mask
}
if (by.column) {
- allocWindow = function(x, n) x[seq_len(n)]
+ allocWindow = function(x, n) list(x[seq_len(n)])
tight = function(i, dest, src, n) FUN(.Call(CmemcpyVector, dest, src, i, n), ...)
} else {
if (!list.df) {
@@ -306,7 +306,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
oldDTthreads = setDTthreads(1L) ## for consistency, anyway window size is unlikely to be big enough to benefit any parallelism
withCallingHandlers(
tryCatch(
- thisans <- lapply(ansi, FUN = tight, dest = cpy(w), src = thisx, n = thisn),
+ thisans <- lapply(ansi, FUN = tight, dest = .Call(Ccopy, w), src = thisx, n = thisn),
error = function(e) h$err = conditionMessage(e)
), warning = function(w) {h$warn = c(h$warn, conditionMessage(w)); invokeRestart("muffleWarning")}
)
diff --git a/src/frollapply.c b/src/frollapply.c
index 3b07cca3..9b0e6483 100644
--- a/src/frollapply.c
+++ b/src/frollapply.c
@@ -34,10 +34,14 @@ default: internal_error(__func__, "column type not supported in memcpyVector or
*/
SEXP memcpyVector(SEXP dest, SEXP src, SEXP offset, SEXP size) {
size_t o = INTEGER(offset)[0] - INTEGER(size)[0];
- int nrow = LENGTH(dest);
- SEXP d = dest, s = src;
+ SEXP d = VECTOR_ELT(dest, 0), s = src;
+ if (MAYBE_SHARED(d)) {
+ d = duplicate(d); // something else besides 'dest' was referencing 'd'
+ SET_VECTOR_ELT(dest, 0, d);
+ }
+ int nrow = LENGTH(d);
MEMCPY
- return dest;
+ return d;
}
// # nocov start ## does not seem to be reported to codecov most likely due to running in a fork, I manually debugged that it is being called when running froll.Rraw
SEXP memcpyDT(SEXP dest, SEXP src, SEXP offset, SEXP size) {This breaks tests 6010.011, 6010.012, 6010.025, 6010.026, 6010.027, but they are testing the currently broken behaviour. This may be worth trying to generalise to the other three(?) cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that make sense. The question is what will be the performance impact of such change.
R function tight as well as memcpy* C functions have been designed to be critically light, as they have to run for each single observation.
Adding VECTOR_ELT() and MAYBE_SHARED() is probably not much, but still called 1e6 times can add up.
Therefore I am thinking the more elegant solution could be to check this only for first observation and based on that route call to different branches (memcpyVector vs memcpyVectorMaybeShared). Branching would obviously have to happen outside of tight function to avoid branching 1e6 times. WDYT?
Considering that
- it would be significant change to the existing code (and documentation),
- current code is well behaving as documented,
- new behavior will be backward compatible,
- frollapply is declared as experimental at the beginning of its manual
I believe we could add this in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performing a trial run (like empty in groupingsets) and choosing the tight function based on that sounds like an even better idea, thank you!
|
Thank you for the review. I addressed your feedback either with commits or a replies in-line.
Good find. For now I removed
|
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
|
I have a 6-core windows machine on which I ran the timings, but I see different results #master
> system.time(frollapply(x, 500, first))
user system elapsed
2.86 0.04 2.92
> system.time(frollapply(x, 500, median))
user system elapsed
42.39 2.19 44.69
> system.time(frollapply(x, 500, first, simplify=unlist))
user system elapsed
12.36 0.77 13.15
> system.time(frollapply(x, 500, median, simplify=unlist))
user system elapsed
42.94 2.23 45.51
#this pr
> system.time(frollapply(x, 500, first))
user system elapsed
7.60 0.04 7.63
> system.time(frollapply(x, 500, median))
user system elapsed
50.20 2.34 52.61
> system.time(frollapply(x, 500, first, simplify=unlist))
user system elapsed
5.67 0.03 5.72
> system.time(frollapply(x, 500, median, simplify=unlist))
user system elapsed
49.17 2.30 51.48 3 timings are slower using this PR, and 1 is faster (FUN=first with simplify=unlist) |
|
@tdhock on Windows machine new frollapply will be slower. It is so because previous frollapply did not support any other types than double on input and output. So results from each operation could have been written directly into single answer double vector. It can also be slower on Linux platform, when function to compute is fast and code is single threaded. It is because new implementation support any type, therefore results from each iteration are being written in list elements, and then
few comments:
please fix the atime job to keep PR in ready-to-merge state. |
|
ok I removed the perforamnce test. |
|
anyone else would like to have a look and submit review before I will merge? |
|
I am merging this PR to unblock further development that depends on this one. I am still very much looking forward for @aitap responses to my comments. Anyone else is of course welcome to review even after merge. If needed I will submit follow up PR(s). |

This PR rewrites completely
frollapply()function. It decouplesfrollapply()from other common rolling functions, which redirect to optimized C routines.The legacy implementation in C code was re-using once allocated window across all iterations, and then calling
Rf_eval()on a user defined function supplied toFUN- therefore could not have been parallelized with OpenMP as we normally do in data.table due to lack of thread safety of arbitrary R function.Moreover it supported only
realtype on input and on output to match the input/output type of C "optimized" rolling functions.The implementation proposed here is written mostly in R. Similarly as legacy C implementation it also re-uses allocated memory across iterations. Additionally it uses multiple CPU threads so iterations are computed in parallel (on a decent OSes). Unlike any other multithreaded code in data.table till date it uses base R
parallelpackage rather than OpenMP. OpenMP could not be used due to reason stated above.It now supports any atomic type and data.table/data.frame/list on input and any type output.
Closes #4887, #7054.
Regarding PR reviews: any code formatting or code reorganization changes which are not fixing any issue in this PR I would like to put in a follow up PR, rather than here. As it will reduce git merge conflicts in upcoming 3 PRs that builds on top of this one.
This PR supersede #5575.
PR vs master
Using 4 threads:
This work was supported by the Medical Research Council, UK [grant number 1573].